// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#ifndef ROCKSDB_FLUSHMANAGER_H
#define ROCKSDB_FLUSHMANAGER_H

#include <cstring>
#include <deque>
#include <utility>
#include <chrono>
#include <mutex>
#include <thread>
#include <condition_variable>
#include "range_arena_rebuild.h"
#include "range_arena.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "options/db_options.h"
#include "options/options_helper.h"
#include "rocksdb/env.h"
#include "util/file_reader_writer.h"
#include "util/filename.h"
#include "pure_mem/rangearena/range_arena_manager.h"

namespace rocksdb {
// flush data module is added to manage the flush operation of all
// memory blocks in the memory engine, as well as the creation, writing,
// deletion and recovery of disk files.
class FlushManager {
 public:
  // A flush node holds a memory block, the corresponding disk file for the
  // memory block, and data about the memory block's data flush
  class FlushNode {
   public:
    FlushNode(RangeArena *ra, Env* env, std::string& flush_dir,
              const ImmutableDBOptions& db_options,
              const EnvOptions& opt_env_options, Logger* info_log);
    ~FlushNode();

    bool RangeArenaIsHandling() const;
    bool NeedFlush();
    bool GetFlushData(const char* &entry, Status& s);

    bool NeedDelete() const ;
    Status CreateFile(uint64_t flush_number);
    Status CloseFile();

    Status Append();
    void AddToFlushMap(const char *entry);
    Status DeleteFile();
    int64_t GetLastFlushTime() const { return last_flush_time_; };
   private:
    static bool sliceLessCompare(const Slice& left_key, const Slice& right_key);
   private:
    Env* env_;
    Logger* info_log_;
    const ImmutableDBOptions& db_options_;
    const EnvOptions& opt_env_options_;

    RangeArena* range_arena_;
    std::shared_ptr<SstFileWriter> sst_file_writer_;
    std::string flush_dir_;
    std::vector<std::string> file_vector_;
    static const InternalKeyComparator internal_key_comparator_;

    size_t kv_flush_last_size_;
    size_t node_data_size_;
    int64_t last_flush_time_;

    std::map<Slice, Slice, bool (*)(const Slice& left_key, const Slice& right_key)> flush_data_map_;
  };
  using FlushQueue = std::deque<FlushNode*>;

  FlushManager() = delete;
  FlushManager(uint32_t column_family_id,
               const ImmutableCFOptions& db_options,
               const std::string& flush_dir_path,
               MultiRangeArena* multi_range_arenas);
  FlushManager(const FlushManager&) = delete;
  FlushManager& operator=(const FlushManager&) = delete;
  ~FlushManager();
  // The memory block flush logic uses a separate background thread to process
  // the flush operation of the memory block sequentially.
  Status Flush();

 private:
  void exitFlushBGThread();
  void addFlushNode(RangeArena* ra);
  Status DeleteWAL(FlushNode* node);
  static bool parseLogFileName(const std::string& filename, uint64_t* number);

 private:
  uint32_t column_family_id_ = 0;
  Env* env_;
  Logger* info_log_;
  const ImmutableDBOptions immutable_db_options_;
  const EnvOptions env_options_;
  EnvOptions opt_env_options_;

  const int64_t kMemFlushMaxInterval = 1800;
  std::string flush_dir_;
  MultiRangeArena* multi_range_arenas_ = nullptr;
  // Flush queue is used to maintain the data structure of memory blocks
  // that need to be flushed. It is responsible for the orderly organization of
  // disk flush nodes. The flush queue is managed by flush manager for
  // orderly processing (such as drop disk processing, delete processing, etc.).
  FlushQueue flush_queue_;

  std::thread bg_thread_;
  std::condition_variable flush_sleep_cv_;
  std::mutex flush_sleep_mutex_;
  std::atomic<uint64_t> flush_number_;

  bool flush_sleep_notified_ = false;
  bool flush_bg_thread_ = true;

  struct LogReporter : public log::Reader::Reporter {
    Env* env;
    Logger* info_log;
    const char* fname;

    Status* status;
    bool ignore_error;  // true if db_options_.paranoid_checks==false
    void Corruption(size_t bytes, const Status& s) override {
      ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
                     (this->ignore_error ? "(ignoring error) " : ""), fname,
                     static_cast<int>(bytes), s.ToString().c_str());
      if (this->status->ok()) {
        // only keep the first error
        *this->status = s;
      }
    }
  };

  static std::unordered_map<uint32_t, int64_t> flush_times_;
};

}  // namespace rocksdb
#endif  // ROCKSDB_FLUSHMANAGER_H
